package thread;

import java.util.concurrent.atomic.AtomicInteger;
/**
 * 基于{@link #Disruptor} 的多线程实现
 * @author jinmiao
 * 2014-9-12 下午2:19:54
 */
public class DisruptorMultiProcessor implements IMessageProcessor {

	private DisruptorSingleProcessor[] processArray;
	/**线程数量**/
	private int threadNum;
	
	private AtomicInteger index = new AtomicInteger();
	/**线程组*/
	private ThreadGroup tg;
	
	
	public DisruptorMultiProcessor(String threadName,int threadNum)
	{
		this.threadNum = threadNum;
		tg = new ThreadGroup(threadName);
	}
	
	
	public void start()
	{
		processArray = new DisruptorSingleProcessor[threadNum];
		for(int i =0;i<threadNum;i++)
		{
			DisruptorSingleProcessor process = new DisruptorSingleProcessor(tg,tg.getName()+i);
			processArray[i] = process;
			process.start();
		}
	}

	public void stop() {
		for(DisruptorSingleProcessor process:processArray)
		{
			process.stop();
		}
	}

	public void put(ITask msg)
	{
		int index = this.index.incrementAndGet();
		DisruptorSingleProcessor process  = processArray[index%threadNum];
		process.put(msg);
	}

	public boolean isFull() {
		return false;
	}

	public void putTimerTask(ITimerTask timerTask) {
		int index = this.index.incrementAndGet();
		DisruptorSingleProcessor process  = processArray[index%threadNum];
		process.putTimerTask(timerTask);
	}
		

}
